Image De-Duplication

                                

                                    # Image De-Duplication
                                    # Author: Robert Swetland
                                    # Date: 2025

                                    # Prerequisites: Python 3
                                    # Required Python packages: OpenCV, NumPy, PathLib, SkLearn, ImageHash, PIL, ArgParse, Collections. Logging, Time, xml.etree.ElementTree, hashlib 

                                    # Required libraries
                                    # In a terminal window paste the following
                                    # pip install opencv-python numpy scikit-learn imagehash Pillow

                                    # To run the script paste the following into a terminal window
                                    # Specify the target folder as well as the output log
                                    # python filecompare.py "Path\To\Image|files" --threshold 0.95 --output my_results.txt

                                    # The threshold parameter (0.0 to 1.0) controls how similar images 
                                    # need to be to be considered duplicates. A higher threshold means 
                                    # images need to be more similar to be marked as duplicates.


                                    import cv2
                                    import numpy as np
                                    from pathlib import Path
                                    from sklearn.metrics.pairwise import cosine_similarity
                                    import imagehash
                                    from PIL import Image
                                    import argparse
                                    from collections import defaultdict
                                    import logging
                                    import time
                                    import xml.etree.ElementTree as ET
                                    import hashlib

                                    # Set up logging configuration
                                    logging.basicConfig(
                                        level=logging.INFO,
                                        format='%(asctime)s - %(levelname)s - %(message)s',
                                        datefmt='%Y-%m-%d %H:%M:%S'
                                    )

                                    def normalize_svg(file_path):
                                        """
                                        Read and normalize SVG content for comparison.
                                        Removes whitespace and normalizes attribute ordering.
                                        
                                        Args:
                                            file_path: Path to the SVG file
                                        
                                        Returns:
                                            Normalized string content of the SVG
                                        """
                                        try:
                                            # Parse SVG file
                                            tree = ET.parse(file_path)
                                            root = tree.getroot()
                                            
                                            # Function to sort attributes
                                            def sort_attributes(elem):
                                                for child in elem:
                                                    sort_attributes(child)
                                                # Sort attributes by key
                                                attrib = dict(sorted(elem.attrib.items()))
                                                elem.attrib.clear()
                                                elem.attrib.update(attrib)
                                            
                                            # Normalize the XML structure
                                            sort_attributes(root)
                                            
                                            # Convert back to string and remove whitespace
                                            content = ET.tostring(root, encoding='unicode')
                                            content = ''.join(content.split())
                                            
                                            return content
                                        except Exception as e:
                                            logging.error(f"Error processing SVG {file_path}: {str(e)}")
                                            return None

                                    def compute_image_features(image_path):
                                        """
                                        Compute features for an image file, handling both raster images and SVGs.
                                        
                                        Args:
                                            image_path: Path to the image file
                                        
                                        Returns:
                                            Dictionary containing computed features or None if image cannot be processed
                                        """
                                        try:
                                            if image_path.suffix.lower() == '.svg':
                                                logging.debug(f"Processing SVG file: {image_path}")
                                                content = normalize_svg(image_path)
                                                if content is None:
                                                    return None
                                                    
                                                # Create a hash of the normalized content
                                                content_hash = hashlib.md5(content.encode()).hexdigest()
                                                return {
                                                    'type': 'svg',
                                                    'content_hash': content_hash,
                                                    'content': content
                                                }
                                            else:
                                                logging.debug(f"Processing raster image: {image_path}")
                                                # Handle raster images as before
                                                pil_img = Image.open(image_path)
                                                
                                                phash = str(imagehash.average_hash(pil_img))
                                                ahash = str(imagehash.average_hash(pil_img))
                                                dhash = str(imagehash.dhash(pil_img))
                                                
                                                cv_img = cv2.imread(str(image_path))
                                                if cv_img is None:
                                                    logging.warning(f"Failed to read image with OpenCV: {image_path}")
                                                    return None
                                                
                                                cv_img = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
                                                hist = cv2.calcHist([cv_img], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
                                                hist = cv2.normalize(hist, hist).flatten()
                                                
                                                return {
                                                    'type': 'raster',
                                                    'phash': phash,
                                                    'ahash': ahash,
                                                    'dhash': dhash,
                                                    'histogram': hist
                                                }
                                                
                                        except Exception as e:
                                            logging.error(f"Error processing image {image_path}: {str(e)}")
                                            return None

                                    def compare_features(features1, features2, similarity_threshold=0.95):
                                        """
                                        Compare two sets of image features.
                                        
                                        Args:
                                            features1: First feature set
                                            features2: Second feature set
                                            similarity_threshold: Threshold for considering images as duplicates
                                        
                                        Returns:
                                            Boolean indicating if images are considered duplicates
                                        """
                                        # If both are SVGs, compare their normalized content
                                        if features1['type'] == 'svg' and features2['type'] == 'svg':
                                            return features1['content_hash'] == features2['content_hash']
                                        
                                        # If one is SVG and one is raster, they're not duplicates
                                        if features1['type'] != features2['type']:
                                            return False
                                        
                                        # For raster images, use the original comparison logic
                                        hash_match = (
                                            features1['phash'] == features2['phash'] or
                                            features1['ahash'] == features2['ahash'] or
                                            features1['dhash'] == features2['dhash']
                                        )
                                        
                                        hist_similarity = cosine_similarity(
                                            features1['histogram'].reshape(1, -1),
                                            features2['histogram'].reshape(1, -1)
                                        )[0][0]
                                        
                                        return hash_match or hist_similarity > similarity_threshold

                                    def find_duplicates(folder_path, similarity_threshold=0.95):
                                        """
                                        Find duplicate images in the specified folder and all its subfolders.
                                        Handles both raster images and SVG files.
                                        
                                        Args:
                                            folder_path: Path to the root folder containing images
                                            similarity_threshold: Threshold for considering images as duplicates (0.0 to 1.0)
                                        
                                        Returns:
                                            Dictionary mapping group IDs to lists of duplicate image paths
                                        """
                                        start_time = time.time()
                                        
                                        try:
                                            # Convert to absolute path and resolve any special characters
                                            folder_path = Path(folder_path).absolute().resolve()
                                            if not folder_path.exists():
                                                logging.error(f"Folder not found: {folder_path}")
                                                return {}
                                            logging.info(f"Resolved path: {folder_path}")
                                        except Exception as e:
                                            logging.error(f"Error processing folder path: {str(e)}")
                                            return {}
                                        
                                        # Collect all image files
                                        logging.info("Collecting image files from all folders...")
                                        image_files = []
                                        supported_formats = ('*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif', '*.svg')
                                        
                                        for ext in supported_formats:
                                            found_files = list(folder_path.rglob(ext))
                                            
                                            # Process each file path
                                            valid_files = []
                                            for file_path in found_files:
                                                processed_path = process_file_path(file_path)
                                                if processed_path:
                                                    valid_files.append(processed_path)
                                                
                                            image_files.extend(valid_files)
                                            logging.info(f"Found {len(valid_files)} valid files with extension {ext}")
                                            
                                        logging.info(f"Total valid images found: {len(image_files)}")
                                        
                                        # Compute features for all images
                                        features_dict = {}
                                        logging.info("Computing image features...")
                                        for i, img_path in enumerate(image_files, 1):
                                            logging.info(f"Processing image {i}/{len(image_files)}: {img_path}")
                                            features = compute_image_features(img_path)
                                            if features is not None:
                                                features_dict[img_path] = features
                                        
                                        # Find duplicates
                                        duplicate_groups = defaultdict(list)
                                        processed_images = set()
                                        
                                        logging.info("Starting duplicate detection...")
                                        total_comparisons = len(features_dict) * (len(features_dict) - 1) // 2
                                        comparison_count = 0
                                        
                                        for i, (img1_path, features1) in enumerate(features_dict.items()):
                                            if img1_path in processed_images:
                                                continue
                                                
                                            current_group = {img1_path}
                                            
                                            for img2_path, features2 in features_dict.items():
                                                if img1_path == img2_path or img2_path in processed_images:
                                                    continue
                                                
                                                comparison_count += 1
                                                if comparison_count % 1000 == 0:
                                                    progress = (comparison_count / total_comparisons) * 100
                                                    logging.info(f"Progress: {progress:.1f}% ({comparison_count}/{total_comparisons} comparisons)")
                                                
                                                if compare_features(features1, features2, similarity_threshold):
                                                    current_group.add(img2_path)
                                            
                                            if len(current_group) > 1:
                                                group_id = len(duplicate_groups)
                                                duplicate_groups[group_id] = list(current_group)
                                                processed_images.update(current_group)
                                        
                                        elapsed_time = time.time() - start_time
                                        logging.info(f"Processing completed in {elapsed_time:.2f} seconds")
                                        logging.info(f"Found {len(duplicate_groups)} groups of duplicate images")
                                        
                                        return duplicate_groups

                                    def sanitize_path(path_str):
                                        """
                                        Sanitize and validate a file path.
                                        Handles special characters including @, spaces, and unicode characters.
                                        
                                        Args:
                                            path_str: String representation of the path
                                            
                                        Returns:
                                            Pathlib Path object or None if invalid
                                        """
                                        try:
                                            # Convert to Path object and resolve to absolute path
                                            path = Path(path_str).absolute().resolve()
                                            
                                            # Check if path contains special characters
                                            if '@' in str(path):
                                                logging.info(f"Path contains @ symbol: {path}")
                                            
                                            if not path.exists():
                                                logging.error(f"Path does not exist: {path}")
                                                return None
                                                
                                            return path
                                        except Exception as e:
                                            logging.error(f"Invalid path: {path_str} - Error: {str(e)}")
                                            return None

                                    def process_file_path(file_path):
                                        """
                                        Process and validate an individual file path.
                                        
                                        Args:
                                            file_path: Path object for the file
                                            
                                        Returns:
                                            Validated Path object or None if invalid
                                        """
                                        try:
                                            if '@' in str(file_path):
                                                logging.debug(f"Processing file with @ symbol: {file_path}")
                                            
                                            # Check if file actually exists
                                            if not file_path.is_file():
                                                logging.warning(f"File not found: {file_path}")
                                                return None
                                                
                                            return file_path
                                        except Exception as e:
                                            logging.error(f"Error processing file path: {file_path} - {str(e)}")
                                            return None

                                    def write_results_to_file(duplicate_groups, output_file="duplicate_results.txt"):
                                        """
                                        Write duplicate detection results to a text file.
                                        
                                        Args:
                                            duplicate_groups: Dictionary of duplicate groups
                                            output_file: Path to output file (default: duplicate_results.txt)
                                        """
                                        try:
                                            with open(output_file, 'w', encoding='utf-8') as f:
                                                if not duplicate_groups:
                                                    f.write("No duplicate images found.\n")
                                                    return
                                                    
                                                f.write(f"Found {len(duplicate_groups)} groups of duplicate images\n")
                                                f.write("=" * 50 + "\n\n")
                                                
                                                for group_id, group in duplicate_groups.items():
                                                    f.write(f"Group {group_id + 1}:\n")
                                                    for img_path in group:
                                                        f.write(f"  {img_path}\n")
                                                    f.write("\n")
                                                    
                                            logging.info(f"Results written to: {output_file}")
                                        except Exception as e:
                                            logging.error(f"Error writing results to file: {str(e)}")

                                    def main():
                                        """
                                        Main function that handles command-line arguments and runs the duplicate detection.
                                        Handles paths with spaces and special characters.
                                        """
                                        import sys
                                        
                                        # Pre-process arguments to handle unquoted paths with spaces
                                        args_list = sys.argv[1:]
                                        if args_list:
                                            # Find the first flag (starting with --) or end of list
                                            first_flag_index = next((i for i, arg in enumerate(args_list) if arg.startswith('--')), len(args_list))
                                            # Join all arguments before the first flag as a single path
                                            if first_flag_index > 0:
                                                path_parts = args_list[:first_flag_index]
                                                joined_path = ' '.join(path_parts)
                                                args_list = [joined_path] + args_list[first_flag_index:]
                                        
                                        parser = argparse.ArgumentParser(
                                            description='Find duplicate images (including SVGs) in a folder and its subfolders',
                                            formatter_class=argparse.RawDescriptionHelpFormatter,
                                            epilog="""\
                                    Examples:
                                    # Path with spaces and special characters (either format works):
                                    python filecompare.py "E:\Files\My Photos\@ New Folder" --threshold 0.95
                                    python filecompare.py E:\Files\My Photos\@ New Folder --threshold 0.95
                                    
                                    # Simple path:
                                    python filecompare.py E:\Files\Photos --threshold 0.95
                                    """)
                                        parser.add_argument('folder_path', 
                                                            help='Path to the root folder containing images')
                                        parser.add_argument('--threshold', type=float, default=0.95,
                                                            help='Similarity threshold (0.0 to 1.0) for raster images')
                                        parser.add_argument('--output', type=str, default='duplicate_results.txt',
                                                            help='Output file path for results (default: duplicate_results.txt)')
                                        parser.add_argument('--debug', action='store_true',
                                                            help='Enable debug logging')
                                        try:
                                            args = parser.parse_args(args_list)
                                        except argparse.ArgumentError as e:
                                            logging.error(f"Error parsing arguments: {str(e)}")
                                            return
                                        except Exception as e:
                                            logging.error(f"Unexpected error: {str(e)}")
                                            return
                                        
                                        if args.debug:
                                            logging.getLogger().setLevel(logging.DEBUG)
                                        
                                        logging.info(f"Starting duplicate image detection in: {args.folder_path}")
                                        logging.info(f"Similarity threshold: {args.threshold}")
                                        
                                        duplicate_groups = find_duplicates(args.folder_path, args.threshold)
                                        
                                        # Write results to file and display them
                                        write_results_to_file(duplicate_groups, args.output)
                                        
                                        # Also display results in console
                                        if not duplicate_groups:
                                            logging.info("No duplicate images found.")
                                            return
                                        
                                        print("\nFound duplicate groups:")
                                        for group_id, group in duplicate_groups.items():
                                            print(f"\nGroup {group_id + 1}:")
                                            for img_path in group:
                                                print(f"  {img_path}")
                                                
                                        logging.info("Duplicate detection completed successfully")

                                    if __name__ == "__main__":
                                        main()